您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 11341|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;; h. B- J9 p  p$ D) k2 Z
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ! e" A: C; M2 c

  3. : _  U1 B4 Y' u1 @+ R
  4. $worker = new Worker();
    & W- y3 x+ s- I) O( i2 W, l; w
  5. // 4个进程6 m! q7 L$ X) v/ \
  6. $worker->count = 4;7 O0 E! Q; \7 x! W+ R4 F' h
  7. // 每个进程启动后在当前进程新增一个Worker监听( o# @. ]# \# ], j6 P% v
  8. $worker->onWorkerStart = function($worker); U# \3 \: h6 T/ t" E$ h0 h7 s9 R
  9. {7 X* ~. J( a+ p
  10.     /**6 K+ Z3 y2 e2 ~) Q/ P
  11.      * 4个进程启动的时候都创建2016端口的Worker$ m: ]& u4 h$ Y
  12.      * 当执行到worker->listen()时会报Address already in use错误. n3 o- M6 G$ Q5 `& q
  13.      * 如果worker->count=1则不会报错
    3 @& h1 N, k% H4 W: x. g0 Y6 E9 S1 z
  14.      */
      c# Y# G& ~" f: Q! x  q& ~
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');0 i$ Z$ e* c4 T4 S- k
  16.     $inner_worker->onMessage = 'on_message';+ A/ {' }7 H, |; D
  17.     // 执行监听。这里会报Address already in use错误0 N6 s  L$ K) \+ b
  18.     $inner_worker->listen();/ P4 N3 h8 G6 }
  19. };  ?# }1 {  h4 h( e# I0 ~( s8 ]

  20. ' L, R" F) \( j, b& b) f; t( A
  21. $worker->onMessage = 'on_message';+ z' P2 N: \" ]& Y% h  ~3 C1 ~

  22. ; f, }. O5 I" q& P
  23. function on_message($connection, $data)* H+ [  Q; ^" O; C2 C
  24. {8 W( B, W* a  A  W5 E/ d9 d$ c8 \1 g
  25.     $connection->send("hello\n");
    3 O8 z" G# R9 n  ?& ^4 N% z0 e
  26. }- l) Z+ J$ P5 }5 Q. N5 d+ i

  27. % y* v) f: i8 I, h) Q+ z& l
  28. // 运行worker5 L+ r0 s& H( [4 f0 S
  29. Worker::runAll();2 C, s* {, s3 p& [1 u8 f6 `# F
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    + q) z% u! C5 I' t
  31. ) V- w7 v& t- z# @4 n0 a) b
  32. use Workerman\Worker;
    2 O6 b8 [; K! q% w  H/ k/ W2 {; X: s
  33. require_once './Workerman/Autoloader.php';( U8 z: ~; d. v" l

  34. , u4 y" L, ^% r$ A& b9 Z5 d' g
  35. $worker = new Worker('text://0.0.0.0:2015');# R4 _4 X7 q( _* v1 Y
  36. // 4个进程, ^% w, a2 q9 a7 _. U! M! n
  37. $worker->count = 4;
    - L/ y- ^& k8 O: m3 ^0 [: p
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ) i$ y5 d& i* s# K- i
  39. $worker->onWorkerStart = function($worker)
    ) ~2 x8 g. E1 p; ]/ W
  40. {" d2 |" g" X5 l" q1 G. l2 E# P
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    . n( c; m8 H! T
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)% y# Q! v9 h/ V! C$ a6 T
  43.     $inner_worker->reusePort = true;
    6 ?/ ?% U) E$ {- M* E
  44.     $inner_worker->onMessage = 'on_message';
    ' m' V# L; G: e8 e. q
  45.     // 执行监听。正常监听不会报错
    7 s# B9 o6 ^) `' z; G
  46.     $inner_worker->listen();
    ; \# F) @: J* d0 m7 k
  47. };
    3 ^; p$ {. L4 p6 G
  48. ' c5 P( C9 o7 m& u$ u$ `' \
  49. $worker->onMessage = 'on_message';
    ( `) }8 W9 v. [6 |. M# }7 z

  50. & N  i2 b# w3 [$ U4 c- ^0 Y; Q
  51. function on_message($connection, $data)
    8 M) U( k; o5 H
  52. {' W* e/ c7 @2 b- q( {/ U
  53.     $connection->send("hello\n");% j0 V5 A: m0 E5 z3 D, X; u2 z
  54. }' s) z8 D& n% E9 H8 O; f
  55. 6 l2 \0 a0 p3 O3 G1 e
  56. // 运行worker
    4 m0 S2 c" Q/ m$ _# ^
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    ) U4 N$ P- K8 e$ H% {0 ?
  2. use Workerman\Worker;8 b" [# N1 S: A" X
  3. require_once './Workerman/Autoloader.php';/ d: r0 m; K2 ]5 D! I
  4. // 初始化一个worker容器,监听1234端口2 u6 A* e0 F1 c( {% O
  5. $worker = new Worker('websocket://0.0.0.0:1234');) h. {, e% ~! t: _( l& t( r
  6. ; M" x/ G' T: C' g; \: g' \8 P# q: L
  7. /*
    , J0 t4 l  \* O$ i, @" b, i# I6 k
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
      Y& ?) w. |0 T1 |
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    7 p* O' ~; F3 p4 c: a
  10. */
    / F, _) x# ^) [7 n+ X
  11. $worker->count = 1;6 l' e5 m& H  B. B. e' m. i. ?& z
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    * C/ Z! a" N% h' L6 C- G7 j7 W
  13. $worker->onWorkerStart = function($worker)
    ( L, [7 T$ \  a: T: a
  14. {# D7 R3 }" E# S9 O& n
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符4 o8 h, T! z. j; q: @/ H+ J9 u) E
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    6 e$ p7 b% Z! \( P
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    & B0 |5 x1 c: X: \7 H2 P5 ]* u
  18.     {
    ) [6 R- w4 R  p0 z0 m
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据- ]) f: L% G+ P2 v+ `) A
  20.         $data = json_decode($buffer, true);
    - U: ^7 l: M% n
  21.         $uid = $data['uid'];9 J$ y. `+ G: d7 |. k+ b
  22.         // 通过workerman,向uid的页面推送数据
    0 ^7 Y; i3 J! ^* o# G0 f3 n
  23.         $ret = sendMessageByUid($uid, $buffer);. Q& C  W% h5 Z" j
  24.         // 返回推送结果+ ?# V. u7 y, C4 \2 h. a3 H
  25.         $connection->send($ret ? 'ok' : 'fail');
    4 e/ k6 p; O: H! H3 B4 P
  26.     };
    $ P( Y9 |- |+ w. Y+ q4 d
  27.     // ## 执行监听 ##
      K# L; I7 n7 u
  28.     $inner_text_worker->listen();
    , n" i. V6 y7 Y4 R) n, d
  29. };
    " o8 J% M. t) ?  o
  30. // 新增加一个属性,用来保存uid到connection的映射' p' r6 g4 t/ V
  31. $worker->uidConnections = array();- `+ l4 ^# ~+ U$ ^7 m
  32. // 当有客户端发来消息时执行的回调函数- c' D) m% M* D( J* I% _( C; X
  33. $worker->onMessage = function($connection, $data)
    ' u  f% h, U1 ?4 j/ m
  34. {
    0 B1 S, [: {; q6 r- d& q' g
  35.     global $worker;
    - D2 j) ^* B0 J; Q, V
  36.     // 判断当前客户端是否已经验证,既是否设置了uid3 o) \: w0 h2 {
  37.     if(!isset($connection->uid))3 a) [+ B3 u* ?. Y6 q& K# L
  38.     {2 m; z6 X6 y! v5 ]( v
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    ! C  l9 B& c2 y: c% G
  40.        $connection->uid = $data;4 ^5 t8 f1 o8 m7 U( V: z4 g$ C0 M
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# Y6 Z9 C2 ]0 w7 [
  42.         * 实现针对特定uid推送数据& {6 ^5 \  a( E! a& q
  43.         */9 ]& s( z4 J$ d# l* w
  44.        $worker->uidConnections[$connection->uid] = $connection;
    . b: ]( T$ [6 q7 V9 C1 w! S
  45.        return;
    . M3 w# r8 w9 ?! F3 P
  46.     }
    ' M. J& r5 w3 v6 N
  47. };$ U# L1 ~9 g0 ^
  48. : W+ J0 P5 Z/ Y# L
  49. // 当有客户端连接断开时
    1 @8 Z- |5 p8 `  F$ y3 y
  50. $worker->onClose = function($connection)
    - S8 p3 x7 l2 [$ f) {6 x
  51. {
      A' N" ]1 h1 c5 Z
  52.     global $worker;
    6 z+ a6 F( @. ~& z! Y* [1 W
  53.     if(isset($connection->uid)): E$ B& J, n  E* H- F( n. r
  54.     {
    ) u% t! _; n* S& C# M8 c
  55.         // 连接断开时删除映射
    6 O$ @# Z! p( D; N+ A& X
  56.         unset($worker->uidConnections[$connection->uid]);( `' D: M1 S+ L+ w$ K2 L5 N  G
  57.     }" p* f6 H' }/ @0 H6 t
  58. };4 _( f& I$ @% l2 |7 x8 }8 \: n

  59. 8 D+ `- Q" U; }$ h, Y$ Q. l
  60. // 向所有验证的用户推送数据$ H. P! C8 v5 d, y
  61. function broadcast($message): G- C2 b) p* _# h
  62. {
    : o6 y5 L3 ^8 [% \: Q
  63.    global $worker;
    " N, h( `% I" q% I5 ?5 g
  64.    foreach($worker->uidConnections as $connection)9 [" Z5 h! I& F
  65.    {% \. B/ j# K1 x# v# n/ w
  66.         $connection->send($message);
    , M3 P1 C. N% g  [) B
  67.    }
      ]- \/ f! N2 [4 f9 w
  68. }6 ?' ]" b) ?* a9 Z- P9 X

  69. 3 ]. d5 |4 J3 l# c5 m' \' d" z
  70. // 针对uid推送数据
    * m" m  U% ~' i$ u0 Z; U
  71. function sendMessageByUid($uid, $message)
    ! b: s& ~( S8 r+ P1 i2 i0 E
  72. {
    9 K0 Y# z; L6 b; q
  73.     global $worker;
    ' c4 X+ T: ^* D4 i; P
  74.     if(isset($worker->uidConnections[$uid]))
    6 h, Z0 I& M  ~  {
  75.     {
    # E  `1 A7 S. x$ i( }" t
  76.         $connection = $worker->uidConnections[$uid];$ ~, W, k* E' j5 E; U' {
  77.         $connection->send($message);
    + u9 b( j- L" b/ Z$ k
  78.         return true;
    6 g% ^. [" U! m% U1 P) W9 y
  79.     }' g. p3 d& |0 S3 f9 B
  80.     return false;
    $ P& l; b$ |6 U6 r* a
  81. }+ V: r* h/ X4 y  ?  {1 S$ o
  82. 0 g/ }  V4 ]- z* T# z
  83. // 运行所有的worker1 C- k9 P- F8 ]4 X
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');) I! N6 p( _) Z* a
  2. ws.onopen = function(){$ C' n. Y$ y. |9 Q! u
  3.     var uid = 'uid1';
    & C' R$ ~1 D$ A: U! z2 Z
  4.     ws.send(uid);, g: |6 S1 L$ U% K3 g. n
  5. };
    " H4 c  m' o4 h
  6. ws.onmessage = function(e){8 I' M* c: N+ Y2 w; V
  7.     alert(e.data);
    6 W( J; _9 \4 p1 n! D- l4 T
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    ! B' ?8 t" _7 g) F
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);$ L" q- y& Y/ J. Z# Y
  3. // 推送的数据,包含uid字段,表示是给这个uid推送, u3 P; [, m/ K7 f/ u7 @
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    $ q8 d7 G( e' j5 A6 n+ |# `0 @
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    * J7 N4 X! L( U4 J2 o9 w
  6. fwrite($client, json_encode($data)."\n");: i- s4 ~! N) g1 i0 K  V6 ]5 s
  7. // 读取推送结果
    # }; \1 C2 z$ {! m& }8 |
  8. echo fread($client, 8192);
复制代码

- x4 e6 _, v& ?% }" H* i. ?, m7 N  w9 n5 r
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-9-29 05:17 , Processed in 0.123792 second(s), 22 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!